adbe8850f5e14dae21bb65e53586048b6bc3f2dd,cdap-app-templates/cdap-etl/cdap-etl-core/src/main/java/co/cask/cdap/etl/planner/PipelinePlanner.java,PipelinePlanner,dagToPipeline,#Dag#Set#Map#,246
Before Change
// add other plugin types
StageSpec spec = specs.get(stageName);
String pluginType = spec.getPlugin().getType();
StageInfo stageInfo = new StageInfo(stageName, spec.getInputs(), spec.getInputSchemas(), spec.getOutputs(),
spec.getOutputSchema(), spec.getErrorDatasetName());
phaseBuilder.addStage(pluginType, stageInfo);
}
After Change
private PipelinePhase dagToPipeline(Dag dag, Set<String> connectors, Map<String, StageSpec> specs) {
PipelinePhase.Builder phaseBuilder = PipelinePhase.builder(supportedPluginTypes);
for (String stageName : dag.getTopologicalOrder()) {
Set<String> outputs = dag.getNodeOutputs(stageName);
if (!outputs.isEmpty()) {
phaseBuilder.addConnections(stageName, outputs);
}
// add connectors
if (connectors.contains(stageName)) {
phaseBuilder.addStage(StageInfo.builder(stageName, Constants.CONNECTOR_TYPE).build());
continue;
}
// add other plugin types
StageSpec spec = specs.get(stageName);
String pluginType = spec.getPlugin().getType();
phaseBuilder.addStage(StageInfo.builder(stageName, pluginType)
.addInputs(spec.getInputs())
.addInputSchemas(spec.getInputSchemas())
.addOutputs(spec.getOutputs())
.setOutputSchema(spec.getOutputSchema())